package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.core.util.ApplicationUtils;
import com.isyscore.os.metadata.enums.DataSourceTypeEnum;
import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowConnection;
import com.isyscore.os.metadata.kettle.base.FlowMetaCodec;
import com.isyscore.os.metadata.kettle.vis.MetricNode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import com.isyscore.os.metadata.model.dto.DataSourceDTO;
import com.isyscore.os.metadata.model.entity.Metric;
import com.isyscore.os.metadata.service.impl.MetricService;
import com.isyscore.os.metadata.utils.StringEscapeHelper;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.util.Arrays;
import java.util.List;

public class MetricStep extends AbstractStep {
    private static String USER = ApplicationUtils.getProperty("spring.datasource.dynamic.datasource.mysql.username");
    private static String PASS = ApplicationUtils.getProperty("spring.datasource.dynamic.datasource.mysql.password");
    private static String IP ="mysql-service";
    private static String PORT="23306";
    private static String DB_NAME="isc_udmp";
    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        MetricNode node = (MetricNode)step;
        MetricService metricService = ApplicationUtils.getBean(MetricService.class);
        //获取数据源ID
        Metric metric = metricService.getMetricById(Long.parseLong(node.getMetricId()));
        DataSourceDTO dataSource = new DataSourceDTO();
        dataSource.setDataSourceId(0L);
        dataSource.setDatabaseName(DB_NAME);
        dataSource.setType(DataSourceTypeEnum.MYSQL.getCode());
        dataSource.setIp(IP);
        dataSource.setPort(PORT);
        dataSource.setUserName(USER);
        dataSource.setPassword(PASS);
        //构建数据源信息
        FlowMetaCodec flowMetaCodec = ApplicationUtils.getBean(FlowMetaCodec.class);
        FlowConnection connection = new FlowConnection();
        connection.fill(dataSource,DB_NAME);
        flowMetaCodec.decodeDatabases(Arrays.asList(connection),transMeta);
        String sql = "select * from metric_job_batch_result where metric_ref_id='"+metric.getId()+"'";
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId( StepPluginType.class, KettleDataFlowNodeType.TableInput.name() );
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass( sp );

        TableInputMeta tableInputMeta = (TableInputMeta) stepMetaInterface;
        tableInputMeta.setDataBaseName(DB_NAME);
        tableInputMeta.setDatabaseMeta(DatabaseMeta.findDatabase(transMeta.getDatabases(), String.valueOf("0")));
        tableInputMeta.setSQL(StringEscapeHelper.decode(sql));
        tableInputMeta.setRowLimit("0");

        tableInputMeta.setExecuteEachInputRow(false);
        tableInputMeta.setVariableReplacementActive(true);
        tableInputMeta.setLazyConversionActive(false);

        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.TableInput.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {

        return null;
    }
}
